// Copyright (c) 2022 by Duguang.IO Inc. All Rights Reserved.
// Author: Ethan Liu
// Date: 2022-06-05 09:23:34

package command

import (
	"context"
	"encoding/json"
	"os"
	"os/signal"
	"syscall"
	"time"

	"jianmu-worker-kube/client"
	"jianmu-worker-kube/engine"
	"jianmu-worker-kube/poller"
	"jianmu-worker-kube/runtime"
	"jianmu-worker-kube/untils/kube"

	"github.com/joho/godotenv"
	"github.com/sirupsen/logrus"
	"golang.org/x/sync/errgroup"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/tools/clientcmd"
)

type DaemonCmd struct {
	Envfile string `arg:"" optional:"" name:"envfile" help:"load the environment variable file." type:"path"`
}

// 空context.
var emptyContext = context.Background()

func (d *DaemonCmd) run(ctx context.Context, config Config) error {
	cli := client.New(
		config.Client.Address,
		config.Client.Secret,
		false,
		config.Worker.ID,
	)

	var kubeClient kubernetes.Interface
	var rest_cfg *rest.Config
	var err error

	if path := config.Worker.Config; path != "" {
		// if the configuration path is specified, we create
		// the kubernetes client from the configuration file.
		// This is used primarily for local out-of-cluster
		// testing.
		kubeClient, err = kube.NewFromConfig((*kube.ClientConfig)(&config.KubernetesClient), path)
		if err != nil {
			logrus.WithError(err).
				Fatalln("cannot load the kubernetes client from config")
		}
		rest_cfg, err = clientcmd.BuildConfigFromFlags("", path)
		if err != nil {
			logrus.WithError(err).
				Fatalln("cannot create rest config")
		}
	} else {
		// else, if no configuration is specified, we create
		// the kubernetes client using the in-cluster
		// configuration file.
		kubeClient, err = kube.NewInCluster((*kube.ClientConfig)(&config.KubernetesClient))
		if err != nil {
			logrus.WithError(err).
				Fatalln("cannot load the in-cluster kubernetes client")
		}
		if rest_cfg, err = rest.InClusterConfig(); err != nil {
			logrus.WithError(err).
				Fatalln("cannot create rest config")
		}
	}

	kubeEngine := engine.New(kubeClient,
		rest_cfg,
		time.Duration(config.Engine.ContainerStartTimeout)*time.Second)

	worker := &runtime.Worker{
		Client:    cli,
		Engine:    kubeEngine,
		ID:        config.Worker.ID,
		Name:      config.Worker.Name,
		Type:      config.Worker.Type,
		Namespace: config.Worker.Namespace,
	}
	poller := &poller.Poller{
		Client:   cli,
		Dispatch: worker.Run,
		Filter:   nil,
	}
	var g errgroup.Group

	// 循环Ping服务器直到成功
	for {
		err := cli.Ping(ctx)
		select {
		case <-ctx.Done():
			return nil
		default:
		}
		if ctx.Err() != nil {
			break
		}
		if err != nil {
			logrus.WithError(err).
				Errorln("cannot ping the remote server")
			time.Sleep(time.Second)
		} else {
			logrus.Debugln("successfully pinged the remote server")
			break
		}
	}
	w := &engine.Worker{
		ID:   config.Worker.ID,
		Name: config.Worker.Name,
		Type: config.Worker.Type,
		Tag:  config.Worker.Tags,
	}
	if err := cli.Join(ctx, w); err != nil {
		logrus.WithError(err).
			Errorln("cannot join the server")
	}
	pods, err := worker.Engine.FindPods(ctx, config.Worker.Namespace)
	if err != nil {
		logrus.Debugln("获取遗留任务失败")
	}
	for i := 0; i < len(pods); i++ {
		logrus.Debugln(pods[i])
		r, err := cli.FindById(ctx, pods[i])
		if err != nil {
			logrus.Debugln("获取遗留任务状态失败")
			worker.Engine.DeletePod(ctx, &engine.Unit{
				PodSpec: engine.PodSpec{
					Name:      pods[i],
					Namespace: config.Worker.Namespace,
				},
			})
		}
		r.PodSpec.Namespace = config.Worker.Namespace
		s, e := json.Marshal(r)
		if e != nil {
			logrus.WithError(e).Errorln("序列化错误")
		}
		logrus.Debugln(string(s))
		worker.Resume(ctx, r)
	}

	g.Go(func() error {
		logrus.WithField("capacity", config.Worker.Capacity).
			WithField("endpoint", config.Client.Address).
			WithField("os", config.Platform.OS).
			WithField("arch", config.Platform.Arch).
			Infoln("polling the remote server")

		poller.Poll(ctx, config.Worker.Capacity)
		return nil
	})

	err = g.Wait()
	if err != nil {
		logrus.WithError(err).
			Errorln("shutting down the server")
	}
	return err
}

func (d *DaemonCmd) Run() error {
	// load environment variables from file.
	godotenv.Load(d.Envfile)

	// load the configuration from the environment.
	config, err := FromEnviron()
	if err != nil {
		return err
	}

	// setup the global logrus logger.
	setupLogger(config)

	ctx, cancel := context.WithCancel(emptyContext)
	defer cancel()

	// 侦听终止信号实现正常退出
	ctx = WithContextFunc(ctx, func() {
		println("received signal, terminating process")
		cancel()
	})

	return d.run(ctx, config)
}

func setupLogger(config Config) {
	if config.Debug {
		logrus.SetLevel(logrus.DebugLevel)
	}
	if config.Trace {
		logrus.SetLevel(logrus.TraceLevel)
	}
}

// WithContextFunc 收到OS中断信号时调用回调函数f，之后调用cancel
func WithContextFunc(ctx context.Context, f func()) context.Context {
	ctx, cancel := context.WithCancel(ctx)
	go func() {
		c := make(chan os.Signal, 1)
		signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
		defer signal.Stop(c)

		select {
		case <-ctx.Done():
		case <-c:
			f()
			cancel()
		}
	}()

	return ctx
}
